Skip to content

Integrate async transport with SDK #4615

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

srothh
Copy link
Member

@srothh srothh commented Jul 23, 2025

Integrate the async transport with the rest of the SDK. Provide a new experimental option transport_async that enables the async transport if an event loop is running. Otherwise, fall back to the sync transport.

Furthermore, adapt the client to work with the async transport. To this end, flush and close were changed to be non blocking and awaitable in an async context to avoid deadlocks, however close enforces a completed flush before shutdown. As there are to my knowledge no background threads running flush/close, these methods are currently not thread-safe/loop-aware for async, which can be changed if necessary.

Atexit issue: The atexit integration used by the SDK runs after the event loop has already closed if asyncio.run() is used. This makes it impossible for the async flush to happen, as atexit calls client.close(), but a loop is no longer present. I attempted to apply this fix by patching the loop close in the asyncio integration, but I am unsure if I did it correctly/put it in the correct spot, or if this is a good idea. From my SDK test however, it seems to fix the flush issue. Note also that this will apparently be patched in Python 3.14, as per the discussion in the linked thread.

As a final note, I added event loop checking. Whenever the event loop is used, the transport/client catch RuntimeErrors, which would be thrown in case the event loop was already shut down. I am not sure if this is a case we need to consider, but I added it for now because I did not want the transport to potentially throw RuntimeError if the event loop is shutdown during a program. If this should be left out currently for simplicity, I can remove it again.

I added the httpcore[asyncio] dependency to requirements-testing, as it is needed for the async httpcore functionality.

GH-4601

srothh added 26 commits July 21, 2025 11:44
…ted a sync transport HTTP subclass

Moved shared sync/async logic into a new superclass (HttpTransportCore), and moved sync transport specific code into a new subclass(BaseSyncHttpTransport), from which the current transport implementations inherit

Fixes GH-4568
Removed an unnecessary TODO message and reverted a class name change for BaseHTTPTransport.

GH-4568
Adds test coverage for the error handling path when HTTP requests return
error status codes.

GH-4568
Restore comments accidentally removed during a previous commit.
Refactored class names such that BaseHttpTransport now has the same functionality as before the hierarchy refactor

GH-4568
Add a new flush_async method in the Transport ABC. This is needed for the async transport, as calling it from the client
while preserving execution order in close will require flush to be a coroutine, not a function.

GH-4568
Move flush_async down to the specific async transport subclass. This makes more sense anyway, as
this will only be required by the async transport. If more async transports are expected,
another shared superclass can be created.

GH-4568
Add necessary type annotations to the core HttpTransport to accomodate for async transport.

GH-4568
Add an abstract bass class for implementation of the background worker. This was done to provide a shared interface for the current
implementation of a threaded worker in the sync context as well as the upcoming async task-based worker implementation.

GH-4578
Add a new factory method instead of direct instatiation of the threaded background worker.
This allows for easy extension to other types of workers, such as the upcoming task-based async worker.

GH-4578
Add a new flush_async method to worker ABC. This is necessary because the async transport cannot use a
synchronous blocking flush.

GH-4578
Move the flush_async down to the concrete subclass to not break existing testing. This makes sense,
as this will only really be needed by the async worker anyway and therefore is not shared logic.

GH-4578
Coroutines have a return value, however the current function signature for the worker methods does not
accomodate for this. Therefore, this signature was changed.

GH-4578
Add a new implementation of the worker interface, implementing the worker as an async task. This is
to be used by the upcoming async transport.

GH-4581
Refactor the flush method in the async worker to use the async_flush coroutine.

GH-4581
…unctions

Add a check to see wheter callbacks are awaitable coroutines or functions, as coroutines need to be awaited.

GH-4581
…coroutines

Coroutines do not return None, therefore it is necessary to consider this in the callback parameter of the worker. Previously,
only callbacks with return Type None were accepted.

GH-4581
Enable concurrent callbacks on async task worker by firing them as a task rather than awaiting them. A done callback handles the necessary queue and exception logic.

GH-4581
Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async.

GH-4581
Add proper type annotation to worker task list to fix linting problems

GH-4581
Add an implementation of Transport to work with the async background worker and HTTPCore async.

GH-4582
Async Transport now properly checks for the presence of the event loop in capture_envelop, and drops items
in case the event loop is no longer running for some reason.

GH-4582
Implement a kill method that properly shuts down the async transport. The httpcore async connection pool needs
to be explicitly shutdown at the end of its usage.

GH-4582
Fix type errors resulting from async override and missing type definition in the async transport.

GH-4582
Copy link

codecov bot commented Jul 23, 2025

❌ 17 Tests Failed:

Tests completed Failed Passed Skipped
22072 17 22055 1231
View the top 3 failed test(s) by shortest run time
tests.integrations.redis.test_redis_cache_module::test_no_cache_basic
Stack Traces | 0.087s run time
.../integrations/redis/test_redis_cache_module.py:26: in test_no_cache_basic
    connection = FakeStrictRedis()
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'
tests.integrations.redis.test_redis::test_redis_pipeline[False-False-expected_first_ten0]
Stack Traces | 0.089s run time
.../integrations/redis/test_redis.py:62: in test_redis_pipeline
    connection = FakeStrictRedis()
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'
tests.integrations.redis.test_redis_cache_module::test_cache_data
Stack Traces | 0.089s run time
.../integrations/redis/test_redis_cache_module.py:125: in test_cache_data
    connection = FakeStrictRedis(host="mycacheserver.io", port=6378)
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Add a try/catch to ensure silent fail on kill in case the event loop shuts down.

GH-4582
@srothh srothh force-pushed the srothh/async-transport-integration branch from e5ea8e9 to 57ea658 Compare July 24, 2025 07:47
@srothh srothh marked this pull request as ready for review July 24, 2025 07:58
@srothh srothh force-pushed the srothh/async-transport branch from 295a0e9 to 6cb72ad Compare July 31, 2025 12:11
cursor[bot]

This comment was marked as outdated.

Copy link
Member

@antonpirker antonpirker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great now.

ignore the linting problem, we need to fix this in master and potel-base branches.

sl0thentr0py and others added 3 commits August 12, 2025 17:32
Refactor transport code based on PR suggestions. Furthermore, add a requirement for the async extra in setup.py

GH-4582
cursor[bot]

This comment was marked as outdated.

Base automatically changed from srothh/async-transport to srothh/transport-class-hierarchy August 13, 2025 16:21
cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

# Shutdown the components anyway
self._close_components()
self._close_transport()
logger.warning("Event loop not running, aborting close.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.warning("Event loop not running, aborting close.")
logger.debug("Event loop not running, aborting close.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these logging statements can be removed entirely with the sync/async seperation, as the async versions are coroutines and therefore do not need to rely on spawning a task, meaning they will not run into this error in the client.

self._flush_async(timeout, callback)
)
except RuntimeError:
logger.warning("Event loop not running, aborting flush.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.warning("Event loop not running, aborting flush.")
logger.debug("Event loop not running, aborting flush.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

Copy link
Member

@sl0thentr0py sl0thentr0py left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one unused method, then we merge!

@@ -917,6 +924,22 @@ def get_integration(

return self.integrations.get(integration_name)

def _close_transport(self) -> Optional[asyncio.Task[None]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer used

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, missed that one. Is removed now

@sl0thentr0py sl0thentr0py merged commit 9a2c80c into srothh/transport-class-hierarchy Aug 14, 2025
108 of 110 checks passed
@sl0thentr0py sl0thentr0py deleted the srothh/async-transport-integration branch August 14, 2025 13:31

except Exception:
if not loop.is_closed():
loop.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test Mismatch: Close vs Close_Async

The test_loop_close_flushes_async_transport test mocks client.close, but the AsyncioIntegration's patched loop close functionality calls client.close_async(). This mismatch means the test isn't verifying the intended behavior, and its assertions on client.close are incorrect.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants